package com.yu.boot2.rocket;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * @author zy
 * @date 2019-01-31 15:58
 **/
@Slf4j
@Configuration
public abstract class DefaultConsumerConfigure {

    @Value("${rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @Value("${rocketmq.consumer.groupName}")
    private String groupName;

    // 开启消费者监听服务
    public void listener(String topic, String tag) throws MQClientException {
        log.info("开启" + topic + ":" + tag + "消费者-------------------");
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.subscribe(topic, tag);
        // 开启内部类实现监听
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> DefaultConsumerConfigure.this.dealBody(msgs));
        consumer.start();
        log.info("rocketmq启动成功---------------------------------------");
    }

    /**
     * 处理dealBody
     *
     * @param msgs
     * @return
     */
    public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs);
}
